#!/usr/bin/perl
use strict;
use MIME::Base64;
use threads;
use Thread::Queue;
use Getopt::Long qw(:config bundling no_ignore_case no_autoabbrev passthrough);
use IO::Handle qw();
use Fcntl qw(:flock);
use POSIX;

my ($DATABASE_NAME,$PORT,@TABLE_ARRAY,@EX_TABLE_ARRAY,$TABLE_FILE,$EX_TABLE_FILE,@SCHEMA_ARRAY,@EX_SCHEMA_ARRAY,$BATCH_SIZE,$DIRECTORY,$GLOBAL_CONDITION,$TIME_FLAG,$FULL_MODE,$INCREMENT,$RESIDUE,
    $FORCE_REDO,$DATA_ONLY,$NEED_TRUNCATE,$NO_ERROR_TABLE,$ENCODING,$COMPRESS,$TAIL,$PARAMETER_FILE,$IS_HELP,$VERSION);
my ($MASTER_BACKUP_PATH,$MASTER_LOG_PATH,$MASTER_LOG_FILE,$MASTER_DONE_FILE,$LOG_FILE_HANDLE);
my ($FULL_FILE_NAME,$LAST_STAT_FILE_NAME,$LOG_FILE_NAME,$DONE_FILE_NAME,$ALLTABLE_FILE_NAME,$FULLFILL_STAT_FILE_NAME) = ("backup_full.tag","last_stat.tag","backup_log.log","backup_done.log","all_table.tag","fullfill.tag");
my ($TASK_QUEUE,$MSG_QUEUE);
my ($SQL_SPLIT,$CMD_SPLIT,$SQL_DELIM,$RECORD_SPLIT) = ('chr(1)||chr(2)||chr(7)',chr(1).chr(2).chr(7),chr(3).chr(4).chr(8),chr(5).chr(6).chr(9).chr(10));
my (@TASK_THREAD,$MSG_THREAD,$LOCK_FILE_HANDLE);
my (%HASH_REPLICATION_TABLE,$CLUSTER_SIZE);
my (@MESSAGE_CACHE);
my ($DATABASE_VERSION);
my ($WRITE_TO_FILE) = (0);
(my $CMD_NAME = $0) =~ s!.*/(.*)!$1!;
my $MAIN_PID = substr("000000".$$,-6);

my ($BATCH_MAX,$BATCH_DEFAULT,$BATCH_MIN,$SQL_BATCH) = (32,5,1,300);

my $RANDOM_TABLE_DDL_CHECK_SQL = qq{select md5(string_agg(attname,',' order by attnum)) from pg_attribute where attnum>0 and attrelid=
    (select oid from pg_class where relname='gp_segment_config' and relnamespace=(select oid from pg_namespace where nspname='gp_toolkit'));};
my $RANDOM_TABLE_DDL_CHECK_MD5 = "7a2c712df03bf33fdd5fea8f0a2ac8ed";
my $RANDOM_TABLE_DDL_SCRIPT = qq{drop table if exists gp_toolkit.gp_segment_config;
    create table gp_toolkit.gp_segment_config(database_content int,database_path varchar(128),database_port int)distributed randomly;};
my $RANDOM_TABLE_RECORD_CHECK_SQL = qq{select (select count(*) from gp_dist_random('gp_id')) segment_number,
        count(*) all_number, count(*) filter(where sc.database_content is null) no_entry_number,
        count(*) filter(where sc.database_content<>sc.gp_segment_id) wrong_number
    from (select c.content,f.fselocation,c.port,c.hostname from gp_segment_configuration c,pg_filespace_entry f
        where c.dbid=f.fsedbid and c.status='u' and c.role='p' and c.content>=0 and f.fsefsoid=(select oid from pg_filespace where fsname='pg_system')
    ) md left join gp_toolkit.gp_segment_config sc
    on md.content=sc.database_content and md.fselocation=sc.database_path and md.port=sc.database_port;};
my $RANDOM_TABLE_RECORD_CHECK_SQL_V6 = qq{select (select count(*) from gp_dist_random('gp_id')) segment_number,
        count(*) all_number, count(*) filter(where sc.database_content is null) no_entry_number,
        count(*) filter(where sc.database_content <> sc.gp_segment_id) wrong_number
    from (select c.content,c.datadir,c.port,c.hostname from gp_segment_configuration c
        where c.status = 'u' and c.role = 'p' and c.content >= 0
    ) md left join gp_toolkit.gp_segment_config sc
    on md.content = sc.database_content and md.datadir = sc.database_path and md.port = sc.database_port;};
my $RANDOM_TABLE_RECORD_INSERT_SQL = q{drop external table if exists gp_toolkit.gp_segment_config_ext;
create external web table gp_toolkit.gp_segment_config_ext(code text)
execute 'C=`PGOPTIONS="-c gp_session_role=utility" PGPORT=$GP_SEG_PORT PGDATABASE=$GP_DATABASE psql -qtAXc "show gp_contentid"`;
`PGOPTIONS="-c gp_session_role=utility" PGPORT=$GP_SEG_PORT PGDATABASE=$GP_DATABASE psql -qtAXc "insert into gp_toolkit.gp_segment_config select $C,''$GP_SEG_DATADIR'',$GP_SEG_PORT"`;'
on all format 'text';
select count(*) from gp_toolkit.gp_segment_config_ext;
drop external table if exists gp_toolkit.gp_segment_config_ext;};

my $GET_PARENT_CHILD_LIST_SQL = q{with all_parent as (
    select c.oid,nspname,relname,relstorage,relpages
    from pg_class c, pg_namespace n
        where c.relnamespace = n.oid
        and c.relkind = 'r' and c.relstorage <> 'x'
        and (n.oid > 16384 or n.nspname = 'public')
        and n.nspname not like E'pg\_temp\_%' and n.nspname not like E'pg\_toast\_temp\_%'
        and c.oid not in(select parchildrelid from pg_partition_rule)
), all_child as (
    select parrelid,parchildrelid,nspname,relname,relstorage,relpages from(
        select p.parrelid,pr.parchildrelid,p.parlevel,n.nspname,c.relname,c.relstorage,
            rank() over(partition by parrelid order by parlevel desc) rank,relpages
        from pg_partition p, pg_partition_rule pr, pg_class c, pg_namespace n
        where pr.paroid = p.oid and pr.parchildrelid = c.oid and c.relnamespace = n.oid
        and c.relkind = 'r' and c.relstorage <> 'x'
        and (n.oid > 16384 or n.nspname = 'public')
        and n.nspname not like E'pg\_temp\_%' and n.nspname not like E'pg\_toast\_temp\_%'
    ) t where rank = 1
), last_oper as (
    select objid,max(statime) statime from pg_stat_last_operation
    where staactionname in('CREATE','TRUNCATE','ALTER')
    group by objid
), aosegname as(
    select c1.oid,c2.relname from pg_class c1,pg_appendonly a,pg_class c2
    where c1.oid = a.relid and a.segrelid = c2.oid
)
select a1.relname,
    replace(encode(textsend(x.nspname),'base64'),chr(10),''),
    replace(encode(textsend(x.relname),'base64'),chr(10),''),x.relstorage,p.statime::timestamp,
    a2.relname,
    replace(encode(textsend(y.nspname),'base64'),chr(10),''),
    replace(encode(textsend(y.relname),'base64'),chr(10),''),y.relstorage,c.statime::timestamp
from all_parent x
left join all_child y on x.oid = y.parrelid
left join last_oper p on x.oid = p.objid
left join last_oper c on y.parchildrelid = c.objid
left join aosegname a1 on x.oid = a1.oid
left join aosegname a2 on y.parchildrelid = a2.oid
order by decode(y.relpages,y.relpages,x.relpages) desc;};

my $GET_PARENT_CHILD_LIST_NO_ERROR_SQL = q{with all_parent as (
    select c.oid,nspname,relname,relstorage,relpages
    from pg_class c, pg_namespace n
        where c.relnamespace = n.oid
        and c.relkind = 'r' and c.relstorage <> 'x'
        and (n.oid > 16384 or n.nspname = 'public')
        and n.nspname not like E'pg\_temp\_%' and n.nspname not like E'pg\_toast\_temp\_%'
        and c.oid not in(select parchildrelid from pg_partition_rule)
        and c.oid not in(select attrelid from pg_attribute
            where attnum>0 group by 1 having string_agg(attname,',' order by attnum) = 'cmdtime,relname,filename,linenum,bytenum,errmsg,rawdata,rawbytes'
        )
), all_child as (
    select parrelid,parchildrelid,nspname,relname,relstorage,relpages from(
        select p.parrelid,pr.parchildrelid,p.parlevel,n.nspname,c.relname,c.relstorage,
            rank() over(partition by parrelid order by parlevel desc) rank,relpages
        from pg_partition p, pg_partition_rule pr, pg_class c, pg_namespace n
        where pr.paroid = p.oid and pr.parchildrelid = c.oid and c.relnamespace = n.oid
        and c.relkind = 'r' and c.relstorage <> 'x'
        and (n.oid > 16384 or n.nspname = 'public')
        and n.nspname not like E'pg\_temp\_%' and n.nspname not like E'pg\_toast\_temp\_%'
    ) t where rank = 1
), last_oper as (
    select objid,max(statime) statime from pg_stat_last_operation
    where staactionname in('CREATE','TRUNCATE','ALTER')
    group by objid
), aosegname as(
    select c1.oid,c2.relname from pg_class c1,pg_appendonly a,pg_class c2
    where c1.oid = a.relid and a.segrelid = c2.oid
)
select a1.relname,
    replace(encode(textsend(x.nspname),'base64'),chr(10),''),
    replace(encode(textsend(x.relname),'base64'),chr(10),''),x.relstorage,p.statime::timestamp,
    a2.relname,
    replace(encode(textsend(y.nspname),'base64'),chr(10),''),
    replace(encode(textsend(y.relname),'base64'),chr(10),''),y.relstorage,c.statime::timestamp
from all_parent x
left join all_child y on x.oid = y.parrelid
left join last_oper p on x.oid = p.objid
left join last_oper c on y.parchildrelid = c.objid
left join aosegname a1 on x.oid = a1.oid
left join aosegname a2 on y.parchildrelid = a2.oid
order by decode(y.relpages,y.relpages,x.relpages) desc;};
my $GET_REPLICATION_TABLE_SQL = q{select replace(encode(textsend(n.nspname),'base64'),chr(10),''),replace(encode(textsend(c.relname),'base64'),chr(10),'')
    from pg_namespace n,pg_class c,gp_distribution_policy p where n.oid = c.relnamespace and c.oid = p.localoid AND policytype='r';};
my $EXECUTE_FUNCTION_ARG_CHECK_SQL = q{select string_agg(t.typname,',' order by a.idx) from(
    select proargtypes typs,generate_series(0,array_upper(proargtypes,1)) idx from pg_proc
        where proname='gp_backup_execute' and pronamespace=(select oid from pg_namespace where nspname='gp_toolkit')
    )a,pg_type t where t.oid=a.typs[a.idx];};
my $EXECUTE_FUNCTION_ARG_CHECK_VALUE = "varchar";
my $EXECUTE_FUNCTION_SRC_CHECK_SQL = q{select md5(prosrc) from pg_proc
    where proname='gp_backup_execute' and pronamespace=(select oid from pg_namespace where nspname='gp_toolkit');};
my $EXECUTE_FUNCTION_SRC_CHECK_MD5 = "5c9dd8af64ef05f79f06df02e39f3cf2";
my $EXECUTE_FUNCTION_DDL = q#create or replace function gp_toolkit.gp_backup_execute(command varchar) returns text as $$
import commands
cmd_str=command
try:
    (cod,val)=commands.getstatusoutput(cmd_str)
    if cod!=0:
        plpy.error("%s:%s" % (cod,val))
    else:
        return val
except Exception, e:
    plpy.error(str(e))
$$ language plpythonu;
#;

my $HEAP_STAT_FUNCTION_ARG_CHECK_SQL = qq{select string_agg(t.typname,',' order by a.idx) from(
    select proargtypes typs,generate_series(0,array_upper(proargtypes,1)) idx from pg_proc
        where proname='gp_heap_table_stat' and pronamespace=(select oid from pg_namespace where nspname='gp_toolkit')
    )a,pg_type t where t.oid=a.typs[a.idx];};
my $HEAP_STAT_FUNCTION_ARG_CHECK_VALUE = "int4,varchar,int4,varchar,varchar";
my $HEAP_STAT_FUNCTION_SRC_CHECK_SQL = qq{select md5(prosrc) from pg_proc where proname='gp_heap_table_stat' and pronamespace=(select oid from pg_namespace where nspname='gp_toolkit');};
my $HEAP_STAT_FUNCTION_SRC_CHECK_MD5 = "424ee1fa4406bfaa4dab0e0404a05803";
my $HEAP_STAT_FUNCTION_DDL = q#CREATE OR REPLACE FUNCTION gp_toolkit.gp_heap_table_stat(version int,dbname varchar,port int,path varchar, tbname varchar) RETURNS SETOF varchar AS $$
import os
grid=tbname
(scma,rel)=grid.split(".",1)
def getSqlValue(sql):
    cmd="PGOPTIONS='-c gp_session_role=utility' psql -qtAXF '|' -v ON_ERROR_STOP=1 -d '"+dbname+"' -p "+str(port)+" 2>&1 <<_END_OF_SQL\n"+sql+"\n_END_OF_SQL\n"
    try:
        val=os.popen(cmd).read()
        return val.strip()
    except Exception, e:
        plpy.error(str(e))
try:
    relInfoSql="""select reltablespace,relfilenode from pg_class c,pg_namespace n
        where c.relnamespace = n.oid and n.nspname = '%s' and c.relname = '%s'""" % (scma,rel)
    cmdVal=getSqlValue(relInfoSql)
    if cmdVal=="":
        plpy.error("Table not exists %s" % (tbname))
    (relTSP,relFile)=cmdVal.split("|",1)
    filePath=""
    datOid = ""
    datInfoSql="""select oid,dattablespace from pg_database where datname = current_database()"""
    (datOid,datTSP)=getSqlValue(datInfoSql).split("|",1)
    if relTSP=="0":
        relTSP=datTSP
    if relTSP=="1663":
        filePath=path + "/base/" + datOid + "/" + relFile
    else:
        if version>5:
            filePath=path+"/pg_tblspc/"+relTSP+"/GPDB_*/"+datOid+"/"+relFile
        else:
            fspSql="""select n.location_1 from pg_tablespace t,pg_filespace f,gp_persistent_filespace_node n"""
            fspSql+=""" where t.spcfsoid = f.oid and f.oid = n.filespace_oid and t.oid = %s""" % (relTSP)
            filePath=getSqlValue(fspSql) + "/" + relTSP + "/" + datOid + "/" + relFile
    lsCmd="""if [ -f %s ];then ls --full-time %s;fi|awk '{print $6" "$7}'\n""" % (filePath,filePath)
    lsCmd+="""I=0;while true;do I=$((I+1));if [ -f %s.$I ];then ls --full-time %s.$I;else break;fi;done|awk '{print $6" "$7}'""" % (filePath,filePath)
    for tim in os.popen(lsCmd).read().strip().split("\n"):
        yield(tim)
except Exception, e:
    plpy.error(str(e))
$$ language plpythonu;
#;

my $BACKUP_FUNCTION_ARG_CHECK_SQL = qq{select string_agg(t.typname,',' order by a.idx) from(
    select proargtypes typs,generate_series(0,array_upper(proargtypes,1)) idx from pg_proc
        where proname='gp_backup_function' and pronamespace=(select oid from pg_namespace where nspname='gp_toolkit')
    )a,pg_type t where t.oid=a.typs[a.idx];};
my $BACKUP_FUNCTION_ARG_CHECK_VALUE = "varchar,int4,int4,varchar,varchar,int8,varchar,varchar,varchar,int4";
my $BACKUP_FUNCTION_SRC_CHECK_SQL = qq{select md5(prosrc) from pg_proc where proname='gp_backup_function' and pronamespace=(select oid from pg_namespace where nspname='gp_toolkit');};
my $BACKUP_FUNCTION_SRC_CHECK_MD5 = "33d397a1e1a69baccec768dfbb75343a";
my $BACKUP_FUNCTION_DDL = q#CREATE OR REPLACE FUNCTION gp_toolkit.gp_backup_function(dbname varchar,port int,content int,path varchar,
    tbname varchar,dtflag bigint,whr varchar,encoding varchar,compress varchar,onnumber int)returns bigint as $$
import os,commands,time
def printLog(flag,str):
    fname="%s/db_dumps/%s/backup_%s_log.log" % (path,dtflag,content)
    flag=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
    handle=open(fname,'a')
    handle.write("%s\t%s\t%s\t%s\t%s\n" % (flag,content,tbname,flag,str.replace("\n","\\\\n").replace('\r','\\\\r').replace('\t','\\\\t')))
    handle.close()
try:
    cmd="mkdir -p %s/db_dumps/%s" % (path,dtflag)
    commands.getstatusoutput(cmd)
except Exception, e:
    plpy.error(str(e))
sign=0
try:
    grid=tbname
    dirc="%s/db_dumps/%s" % (path,dtflag)
    fname="%s^%s^%s" % (dbname,content,grid)
    (scma,rel)=grid.split(".",1)
    comp=" > "+dirc+"/"+fname+".tmp"
    move="mv "+dirc+"/"+fname+".tmp"+" "+dirc+"/"+fname
    tail=""
    if "gzip"==compress:
        comp="|gzip -1"+comp
        move=move+".gz"
        tail=".gz"
    elif "zstd"==compress:
        comp="|zstd -1"+comp
        move=move+".zst"
        tail=".zst"
    copyq="""copy (select * from "%s"."%s"%s) to STDOUT;""" % (scma,rel,whr)
    if ""==whr:
        copyq="""copy "%s"."%s" to STDOUT;""" % (scma,rel)
    if onnumber!=-1 and content!=onnumber:
        copyq=""
    cmd="set -o pipefail\n"+"PGOPTIONS='-c gp_session_role=utility -c client_encoding="+encoding+"' psql -qtAX -v ON_ERROR_STOP=1 -d '"+dbname+"' -p "+str(port)
    cmd+=" <<'_END_OF_SQL'"+comp+"\n"+copyq+"\n_END_OF_SQL\n"+move
    printLog("BEGIN","")
    (cod,val)=commands.getstatusoutput(cmd)
    if len(val)>0:
        printLog("FAILD",val)
        sign=1
        plpy.error(val)
    elif cod!=0:
        printLog("FAILD","Unknown error with code: %s" % (cod))
        sign=1
        plpy.error("Unknown error with code: %s" % (cod))
    else:
        printLog("SUCCESS","")
        return os.path.getsize(dirc+"/"+fname+tail)
except Exception, e:
    if sign==0:
        printLog("ERROR",str(e))
    plpy.error(str(e))
$$ language plpythonu;
#;
############################################################
my $HELP_MESSAGE = qq#COMMAND NAME: $CMD_NAME
Backup a Greenplum database data to compress files one file one table on all segment instances.

Developed by Miao Chen

Work Email:
michen\@pivotal.io
Private Email:
miaochen\@mail.ustc.edu.cn

************************************************************************************************
SYNOPSIS
************************************************************************************************
$CMD_NAME --database database name
    [--port database port]
    [-t <schema.relation> [-t <schema.relation>] ...]
    [-T <schema.relation> [-T <schema.relation>] ...]
    [-f file include table name]
    [-F file exclude table name]
    [-s <schema> [-s <schema>] ...]
    [-S <schema> [-S <schema>] ...]
    [-B batch_size]
    [--directory directory]
    [--where condition]
    [--time-flag time_flag]
    [--increment]
    [--residue number]
    [--force-redo]
    [-a]
    [--truncate]
    [--no-error]
    [--encoding encoding]
    [--fast]
    [--parameter-file filename]
    [-h|--help]
    [--version]
*****************************************************
DESCRIPTION
*****************************************************
The $CMD_NAME utility is used to backup the Greenplum Database User's data.
When you start $CMD_NAME,the utility will backup every table to local disk.

Backup is running in parallel.
One table's backup is running at once on all Segment Instance.
At any time,there are several tables are backup.
*****************************************************
OPTIONS
*****************************************************

--database <database>

  Required. The name of the Greenplum database to backup.
  If not specify -t and -f options, the utility will backup all user tables in this database.
  eg.:
  --database postgres

--port <master port>

  Database port number, If not specified, the default is 5432.
  eg.
  --port 5433

-t <schema.relation>

  A table in database to be backup.
  The fully qualified table name must be specified.
  This option can be specified multiple times to include multiple tables.
  If the table does not exist, $CMD_NAME ignore it.

  eg.
  -t public.backup_a

-T <schema.relation>

  Do not backup this table.
  This option can be specified multiple times to exclude multiple tables.
  eg.:
  -S public.ex_a -S public.ex_b

-f <table-file>

  The location and name of file containing list of fully qualified table names to be backup.
  In the text file, you should specify a single fully qualified table per line.
  If the table does not exist, $CMD_NAME ignore it.
  You should specify the qualified table name as format:
  schema.relation[;condition];
  This example lists 4 tables should be backup.
  public.customer;time>='2016-01-01'
  public.customer_v;district_code='021'
  myschema.orders
  his.orders_his

-F <table-file>

  The location and name of file containing list of fully qualified table names to be exclude.
  In the text file, you should specify a single fully qualified table per line.
  If the table does not exist, $CMD_NAME ignore it.
  You should specify the qualified table name as format:
  schema.relation

-s <schema name>

  Backup tables in this schema.
  This option can be specified multiple times to include multiple schemas.
  eg.:
  -s public -s myschema

-S <schema name>

  Do not backup tables in this schema.
  This option can be specified multiple times to include multiple schemas.
  eg.:
  -S public -S myschema

-B <batch_size>

  Sets the maximum number of tables that $CMD_NAME concurrently backup database.
  If not specified, the default is $BATCH_DEFAULT.
  The max is $BATCH_MAX.
  The min is $BATCH_MIN.

--directory <directory>

  Backup tables data to this directory.
  All segments(include Master) must have this directory, and gpadmin user have authority.
  If not specified, $CMD_NAME will backup to instance's data directory / db_dumps /
  eg.:
  --directory /data

--where <condition>

  With condition when dump a table.
  This condition is gloabal condition, condition in -f file will over write this global condition.
  eg.:
  --where "data_date='2016-01-01'"

--time-flag <14-digit string time>

  Backup time flag.
  Never specify it except for test, $CMD_NAME will use the current time as the flag.
  eg.:
  --time-flag 20140825082500

--increment

   Backup will identify whether a table need backup again.
   From the last full backup to current, if found a table not change, will ignore it.

--residue <number of increment>

   Before begin to execute backup operation, will merge old full backup and icrement backup.
   If not specified, will do nothing.
   If specified and exists multi full backup, will clear all content before the last full backup.

-a

  Backup only data, no ddls will be backup.

--force-redo

   Backup will check the success log file in the current backup directory.
   Default, backup will ignore the table which found in the success log file.
   With this parameter, backup will ignore the success log file.
   If specify --full or --increment, some table may still be ignore for it's stat not change.

--truncate

  Truncate table data after success backup table data.

--no-error

  Backup table range not include external table's error table.

--encoding <encoding>

  Backup data with this encoding, the default is UTF8.


--compress <method>

  Specify this option to enable compress for backup files.
  Current support gzip and zstd compression method.
  You should make sure all the hosts have installed the compression rpm.

  eg.
  --compress gzip
  --compress zstd

--parameter-file <parameter_file>

  Specify the parameter file.
  Format:
  name=value
  Like:
  f=backup_table_list
  database=postgres
  ...

-h|--help

  Displays the online help.

--version

  Displays the command version.

Examples:
$CMD_NAME --database postgres --port 5432 --time-flag 20140825082500 -a
$CMD_NAME --database postgres --port 5432 --increment --time-flag 20140825082500 --force-redo -a
$CMD_NAME -h
#;

sub printMessage{
    my ($flag,$message) = @_;
    if("RAW" ne $flag){
        my $time_flag = strftime("%Y%m%d:%H:%M:%S.",localtime).$MAIN_PID;
        $message = "$time_flag-[$flag]-:$message\n";
    }
    if("ERROR" eq $flag){
        print STDERR $message;
    }else{
        print STDOUT $message;
    }
    return $message;
}
sub logMessage{
    my ($flag,$message) = @_;
    my $log_message = printMessage($flag,$message);
    if($WRITE_TO_FILE == 1){
        for my $msg(@MESSAGE_CACHE){
            print $LOG_FILE_HANDLE $msg;
        }
        @MESSAGE_CACHE = ();
        $WRITE_TO_FILE = 2;
        print $LOG_FILE_HANDLE $log_message;
    }elsif($WRITE_TO_FILE == 2){
        print $LOG_FILE_HANDLE $log_message;
    }else{
        push @MESSAGE_CACHE,$log_message;
    }
}
sub exitMain{
    my ($code) = @_;
    if("" ne $MSG_THREAD){
        $MSG_THREAD->kill('KILL')->detach();
    }
    for my $thread(@TASK_THREAD){
        $thread->kill('KILL')->detach();
    }
    if(defined $LOG_FILE_HANDLE){
        close($LOG_FILE_HANDLE);
    }
    exit $code;
}
sub errorMessage{
    my ($message) = @_;
    logMessage("ERROR",$message);
    print "Usage: $CMD_NAME [-h|--help] [options]\n";
    exitMain(1);
}
sub trim{
    my ($string) = @_;
    $string =~ s/(^\s+|\s+$)//g;
    return $string;
}
sub encode{
    my ($string) = @_;
    my $encode = encode_base64($string);
    $encode =~ s/\n//g;
    return $encode;
}
sub decode{
    return decode_base64($_[0]);
}
sub illegal{
    my @list = @_;
    for my $str(@list){
        if ($str =~ /\W/){
            return 1;
        }
    }
    return 0;
}
sub escape{
    my ($str) = @_;
    $str =~ s/\r/\\r/g;
    $str =~ s/\n/\\n/g;
    $str =~ s/\t/\\t/g;
    return $str;
}
sub readLineFromFile{
    my ($file_path) = @_;
    if(!-e $file_path){
        errorMessage("No file exists named: $file_path");
    }
    if(!open(FILE,"<",$file_path)){
        errorMessage("Can't open file: $file_path");
    }
    my @line_list = ();
    while(my $line = <FILE>){
        $line = trim($line);
        if(!($line =~ /^#/) && "" ne $line){
            push @line_list,$line;
        }
    }
    close FILE;
    return @line_list;
}
sub queryResult{
    my ($query_sql,$return_flag) = @_;
    my $CMDS = "PGDATABASE=$DATABASE_NAME PGPORT=$PORT PGOPTIONS='-c optimizer=off -c client_encoding=UTF8' ";
    local $/ = $RECORD_SPLIT;
    $CMDS = $CMDS."psql -R '$/' -tAXF '$SQL_DELIM' -v ON_ERROR_STOP=1 2>&1 <<'END_OF_SQL'\n";
    $CMDS = $CMDS.$query_sql."\n";
    $CMDS = $CMDS."END_OF_SQL\n";
    my @result = readpipe($CMDS);
    my $return_code = $? >> 8;
    chomp(@result);
    local $/ = chr(10);
    chomp($result[-1]) if (@result > 0);
    return ($return_code,join("\n",@result)) if ("CV" eq $return_flag);
    errorMessage(join("\n",@result)) if ($return_code);
    return join("\n",@result) if ("Scalar" eq $return_flag);
    my @return_list = ();
    for my $row(@result){
        push @return_list,[split(/$SQL_DELIM/,$row)];
    }
    return @return_list;
}
sub getOption{
    GetOptions(
        'database:s'       => \$DATABASE_NAME,    'port:i'      => \$PORT,
        't:s'              => \@TABLE_ARRAY,      'T:s'         => \@EX_TABLE_ARRAY,
        'f:s'              => \$TABLE_FILE,       'F:s'         => \$EX_TABLE_FILE,
        's:s'              => \@SCHEMA_ARRAY,     'S:s'         => \@EX_SCHEMA_ARRAY,
        'B:i'              => \$BATCH_SIZE,       'directory:s' => \$DIRECTORY,
        'where:s'          => \$GLOBAL_CONDITION, 'time-flag:i' => \$TIME_FLAG,
        'increment!'       => \$INCREMENT,        'residue:i'   => \$RESIDUE,
        'force-redo!'      => \$FORCE_REDO,       'a!'          => \$DATA_ONLY,
        'truncate!'        => \$NEED_TRUNCATE,    'no-error!'   => \$NO_ERROR_TABLE,
        'encoding:s'       => \$ENCODING,         'compress:s'  => \$COMPRESS,
        'parameter-file:s' => \$PARAMETER_FILE,   'h|help!'     => \$IS_HELP,
        'version!'         => \$VERSION,
    );
    if(@ARGV != 0){
        errorMessage("Some parameters unknown: [@ARGV]\nPlease refer to $CMD_NAME --help");
    }
    if($IS_HELP){
        print $HELP_MESSAGE;
        exitMain(0);
    }
    if($VERSION){
        print "$CMD_NAME 1.5\n";
        exitMain(0);
    }
    if("" ne $PARAMETER_FILE){
        my @parameter_list = readLineFromFile($PARAMETER_FILE);
        for my $line(@parameter_list){
            my ($para,$val) = split(/=/,$line,2);
            ($para,$val) = (trim($para),trim($val));
            if("database"     eq $para && "" eq $DATABASE_NAME    ){$DATABASE_NAME         = $val;}
            if("port"         eq $para && "" eq $PORT             ){$PORT                  = $val;}
            if("t"            eq $para                            ){push @TABLE_ARRAY      , $val;}
            if("T"            eq $para                            ){push @EX_TABLE_ARRAY   , $val;}
            if("f"            eq $para && "" eq $TABLE_FILE       ){$TABLE_FILE            = $val;}
            if("F"            eq $para && "" eq $EX_TABLE_FILE    ){$EX_TABLE_FILE         = $val;}
            if("s"            eq $para                            ){push @SCHEMA_ARRAY     , $val;}
            if("S"            eq $para                            ){push @EX_SCHEMA_ARRAY  , $val;}
            if("B"            eq $para && "" eq $BATCH_SIZE       ){$BATCH_SIZE            = $val;}
            if("directory"    eq $para && "" eq $DIRECTORY        ){$DIRECTORY             = $val;}
            if("where"        eq $para && "" eq $GLOBAL_CONDITION ){$GLOBAL_CONDITION      = $val;}
            if("time-flag"    eq $para && "" eq $TIME_FLAG        ){$TIME_FLAG             = $val;}
            if("increment"    eq $para && "" eq $INCREMENT        ){$INCREMENT             = 1   ;}
            if("force-redo"   eq $para && "" eq $FORCE_REDO       ){$FORCE_REDO            = 1   ;}
            if("a"            eq $para && "" eq $DATA_ONLY        ){$DATA_ONLY             = 1   ;}
            if("truncate"     eq $para && "" eq $NEED_TRUNCATE    ){$NEED_TRUNCATE         = 1   ;}
            if("no-error"     eq $para && "" eq $NO_ERROR_TABLE   ){$NO_ERROR_TABLE        = 1   ;}
            if("encoding"     eq $para && "" eq $ENCODING         ){$ENCODING              = $val;}
            if("compress"     eq $para && "" eq $COMPRESS         ){$COMPRESS              = $val;}
        }
    }
    $DIRECTORY =~ s/\/*\s*$//;
}
sub checkOption{
    if("" eq $DATABASE_NAME){
        errorMessage("Please specify parameter: --database");
    }elsif(illegal($DATABASE_NAME)){
        errorMessage("Database name is not legal:[".escape($DATABASE_NAME)."]");
    }
    if("" eq $PORT){
        $PORT = '5432';
    }
    if("" eq $TIME_FLAG){
        $TIME_FLAG = strftime("%Y%m%d%H%M%S",localtime);
    }
    if("" eq $BATCH_SIZE || $BATCH_SIZE > $BATCH_MAX || $BATCH_SIZE < $BATCH_MIN){
        logMessage("NOTICE","Not specify or out of limit, use default($BATCH_DEFAULT): -B");
        $BATCH_SIZE = $BATCH_DEFAULT;
    }
    $GLOBAL_CONDITION = "" ne $GLOBAL_CONDITION ? " where ".$GLOBAL_CONDITION : "";
    if("" eq $INCREMENT){
        logMessage("NOTICE","Not specify --increment, will backup as full mode");
        $FULL_MODE = 1;
    }
    if($INCREMENT){
        logMessage("NOTICE","Increment mode specify, ignore all data filter condition and ignore --truncate");
        $GLOBAL_CONDITION = "";
        $NEED_TRUNCATE = "";
    }
    if($RESIDUE < 0){
        errorMessage("Residue number must be greater than 0");
    }
    if("" eq $ENCODING){
        $ENCODING = "UTF8";
    }
    logMessage("INFO","Option values: --database $DATABASE_NAME --port $PORT -t ".join(' -t ',@TABLE_ARRAY)." -T ".join(' -T ',@EX_TABLE_ARRAY));
    logMessage("INFO","Option values: -f $TABLE_FILE -F $EX_TABLE_FILE -s ".join(' -s ',@SCHEMA_ARRAY)." -S ".join(' -S ',@EX_SCHEMA_ARRAY));
    logMessage("INFO","Option values: -B $BATCH_SIZE --directory $DIRECTORY --where $GLOBAL_CONDITION --time-flag $TIME_FLAG");
    logMessage("INFO","Option values: --full $FULL_MODE --increment $INCREMENT --residue $RESIDUE --force-redo $FORCE_REDO");
    logMessage("INFO","Option values: -a $DATA_ONLY --truncate $NEED_TRUNCATE --no-error $NO_ERROR_TABLE --encoding $ENCODING --compress $COMPRESS --parameter-file $PARAMETER_FILE");
    $COMPRESS = lc($COMPRESS);
    if($COMPRESS eq ""){
        $COMPRESS = "0";
    }elsif($COMPRESS ne "gzip" && $COMPRESS ne "zstd"){
        errorMessage("Current only support gzip and zstd compression method");
    }
    $TAIL = $COMPRESS eq "gzip" ? ".gz" : $COMPRESS eq "zstd" ? ".zst" : "";
}
sub getVersion{
    my $versionString = queryResult("SELECT version();","Scalar");
    if($versionString =~ /Greenplum Database (\d)/){
        $DATABASE_VERSION = int($1);
    }
}
sub checkFailover{
    my $fail_count = queryResult("select count(*) from gp_segment_configuration where role <> preferred_role;","Scalar");
    if($fail_count ne 0){
        errorMessage("Can't execute backup when some instance occur failover");
    }
}
sub processDataDirectory{
    $MASTER_BACKUP_PATH = $DIRECTORY;
    if("" eq $MASTER_BACKUP_PATH){
        $MASTER_BACKUP_PATH = queryResult("show data_directory;","Scalar")."/db_dumps/";
    }else{
        $MASTER_BACKUP_PATH = $MASTER_BACKUP_PATH."/gpseg-1/db_dumps/";
    }
    $MASTER_LOG_PATH = $MASTER_BACKUP_PATH.$TIME_FLAG;
    my $value = readpipe("mkdir -p ".$MASTER_LOG_PATH);
    my $code = $? >> 8;
    if($code != 0){
        errorMessage("Can't create directory $MASTER_LOG_PATH on Master for \n:".$value);
    }
    $MASTER_LOG_FILE = $MASTER_LOG_PATH."/".$LOG_FILE_NAME;
    $MASTER_DONE_FILE = $MASTER_LOG_PATH."/".$DONE_FILE_NAME;
    if(-e $MASTER_DONE_FILE){
        printMessage("NOTICE","Backup success log file: $MASTER_DONE_FILE");
    }else{
        printMessage("NOTICE","Try to create backup success log file: $MASTER_DONE_FILE");
    }
    if(!open($LOG_FILE_HANDLE,">>",$MASTER_LOG_FILE)){
        errorMessage("Can't open file: $MASTER_LOG_FILE");
    }else{
        $LOG_FILE_HANDLE->autoflush(1);;
    }
    system("touch $MASTER_LOG_PATH/$LAST_STAT_FILE_NAME");
    system("touch $MASTER_DONE_FILE");
    $WRITE_TO_FILE = 1;
}
sub checkRandomTable{
    my $check_result = queryResult($RANDOM_TABLE_DDL_CHECK_SQL,"Scalar");
    if($RANDOM_TABLE_DDL_CHECK_MD5 ne $check_result){
        logMessage("NOTICE","No table gp_toolkit.gp_segment_config or need recreate it");
        queryResult($RANDOM_TABLE_DDL_SCRIPT);
    }
    my $random_table_record_check_sql = $RANDOM_TABLE_RECORD_CHECK_SQL;
    if($DATABASE_VERSION > 5){
        $random_table_record_check_sql = $RANDOM_TABLE_RECORD_CHECK_SQL_V6;
    }
    my @check_record_result = queryResult($random_table_record_check_sql);
    my ($segment_number,$all_number,$no_entry_number,$wrong_number) = @{$check_record_result[0]};
    if($segment_number ne $all_number || $no_entry_number ne 0 || $wrong_number ne 0){
        logMessage("INFO","Racord in gp_toolkit.gp_segment_config incorrect, reinsert");
        queryResult("truncate table gp_toolkit.gp_segment_config;");
        queryResult($RANDOM_TABLE_RECORD_INSERT_SQL);
        @check_record_result = queryResult($random_table_record_check_sql);
        ($segment_number,$all_number,$no_entry_number,$wrong_number) = @{$check_record_result[0]};
        if($segment_number ne $all_number || $no_entry_number ne 0 || $wrong_number ne 0){
            errorMessage("Can not create right record in table table gp_toolkit.gp_segment_config");
        }
    }else{
        logMessage("INFO","Racord in gp_toolkit.gp_segment_config is OK");
    }
    $CLUSTER_SIZE = $segment_number;
}
sub checkLanguage{
    my $result = queryResult("select 1 from pg_language where lanname='plpythonu';","Scalar");
    if("" eq $result){
        logMessage("NOTICE","No language plpythonu exists in database, create it");
        queryResult("create language plpythonu;");
    }else{
        logMessage("INFO","Language plpythonu is OK");
    }
}
sub checkExecuteFunction{
    my $args_value = queryResult($EXECUTE_FUNCTION_ARG_CHECK_SQL,"Scalar");
    my $src_md5 = queryResult($EXECUTE_FUNCTION_SRC_CHECK_SQL,"Scalar");
    if($EXECUTE_FUNCTION_ARG_CHECK_VALUE ne $args_value || $EXECUTE_FUNCTION_SRC_CHECK_MD5 ne $src_md5){
        logMessage("NOTICE","No execute function or need replace, create or replace it");
        queryResult("drop function if exists gp_toolkit.gp_execute_function($args_value);");
        queryResult($EXECUTE_FUNCTION_DDL);
    }else{
        logMessage("INFO","Execute function gp_toolkit.gp_execute_function is OK");
    }
}
sub checkBackupFunction{
    my $args_value = queryResult($BACKUP_FUNCTION_ARG_CHECK_SQL,"Scalar");
    my $src_md5 = queryResult($BACKUP_FUNCTION_SRC_CHECK_SQL,"Scalar");
    if($BACKUP_FUNCTION_ARG_CHECK_VALUE ne $args_value || $BACKUP_FUNCTION_SRC_CHECK_MD5 ne $src_md5){
        logMessage("NOTICE","No backup function or need replace, create or replace it");
        queryResult("drop function if exists gp_toolkit.gp_backup_function($args_value);");
        queryResult($BACKUP_FUNCTION_DDL);
    }else{
        logMessage("INFO","Backup function gp_toolkit.gp_backup_function is OK");
    }
}
sub checkHeapStatFunction{
    my $args_value = queryResult($HEAP_STAT_FUNCTION_ARG_CHECK_SQL,"Scalar");
    my $src_md5 = queryResult($HEAP_STAT_FUNCTION_SRC_CHECK_SQL,"Scalar");
    if($HEAP_STAT_FUNCTION_ARG_CHECK_VALUE ne $args_value || $HEAP_STAT_FUNCTION_SRC_CHECK_MD5 ne $src_md5){
        logMessage("NOTICE","No heap stat function or need replace, create or replace it");
        queryResult("drop function if exists gp_toolkit.gp_heap_table_stat($args_value);");
        queryResult($HEAP_STAT_FUNCTION_DDL);
    }else{
        logMessage("INFO","Heap stat function gp_toolkit.gp_heap_table_stat is OK");
    }
}
sub getMergeStat{
    my $time_string = readpipe("ls -1L $MASTER_BACKUP_PATH|sort -rn");
    my @valid_time_list = ();
    my @garbage_time_list = ();
    my ($last_full_time,$merge_time,$increment_count) = ("","",0);
    for my $time(split(/\n/,$time_string)){
        my $fullfill_stat_file = $MASTER_BACKUP_PATH.$time."/".$FULLFILL_STAT_FILE_NAME;
        my $log_stat_file = $MASTER_BACKUP_PATH.$time."/".$LOG_FILE_NAME;
        if(!-f $fullfill_stat_file && $time != $TIME_FLAG){
            if(-f $log_stat_file){
                logMessage("WARN",$time." : Is an invalid backup path and a garbage backup path");
                push @garbage_time_list,$time;
            }else{
                logMessage("WARN",$time." : Is an invalid backup path");
            }
            next;
        }
        if($time > $TIME_FLAG){
            logMessage("WARN",$time." : Is a future time");
            next;
        }
        my $full_file = $MASTER_BACKUP_PATH.$time."/".$FULL_FILE_NAME;
        if(-f $full_file && "" eq $last_full_time){
            $last_full_time = $time;
        }
        if("" eq $last_full_time){
            $increment_count += 1;
            if($increment_count == $RESIDUE + 2){
                $merge_time = $time;
            }
        }
        push @valid_time_list,$time;
    }
    return ($last_full_time,$merge_time,\@valid_time_list,\@garbage_time_list);
}
sub getMergeCommand{
    my ($last_full_time,$merge_time,$valid_time_list,$garbage_time_list) = @_;
    my @alltable_list = readLineFromFile($MASTER_BACKUP_PATH.$merge_time."/".$ALLTABLE_FILE_NAME);
    my %alltable_hash = ();
    for my $line(@alltable_list){
        my ($dbname,$scma,$rel) = split(/\./,$line);
        ($dbname,$scma,$rel) = (decode($dbname),decode($scma),decode($rel));
        $alltable_hash{$dbname.".".$scma.".".$rel} = "";
    }
    my @stat_list = readLineFromFile($MASTER_BACKUP_PATH.$merge_time."/".$LAST_STAT_FILE_NAME);
    my %stat_hash = ();
    for my $line(@stat_list){
        my $table = (split(/;/,$line))[1];
        $stat_hash{$table} = $line;
    }
    my @merge_cmd = ();
    for my $time(@$valid_time_list){
        if($time < $last_full_time || $time >= $merge_time){
            next;
        }
        my @pre_stat_list = readLineFromFile($MASTER_BACKUP_PATH.$time."/".$LAST_STAT_FILE_NAME);
        for my $line(@pre_stat_list){
            my @array = split(/;/,$line);
            my ($table,$tail) = ($array[1],$array[-1]);
            if(exists $alltable_hash{$table} && !exists $stat_hash{$table}){
                $stat_hash{$table} = "";
                push @stat_list,$line;
                my ($dbname,$scma,$rel) = split(/\./,$table);
                push @merge_cmd,'N='.$dbname.'^$C^'.$scma.'.'.$rel."$tail;F=".$time.'/$N;T='.$merge_time.'/$N;if [ -f $F ];then mv $F $T;elif [ ! -f $T ];then echo $P/$T$M;exit 1;fi';
            }
        }
    }
    my $alltable_size = @alltable_list;
    my $merge_size = @merge_cmd;
    logMessage("INFO","Merge data alltable size is: [$alltable_size] and merge size is: [$merge_size]");
    my $merge_stat_file = $MASTER_BACKUP_PATH.$merge_time."/".$LAST_STAT_FILE_NAME.".merge";
    system("touch ".$merge_stat_file);
    my $FILE_HANDLE;
    if(!open($FILE_HANDLE,">",$merge_stat_file)){
        errorMessage("Can't open file: $merge_stat_file");
    }
    $FILE_HANDLE->autoflush(1);
    print $FILE_HANDLE join("\n",@stat_list)."\n";
    close $FILE_HANDLE;
    for my $time(@$valid_time_list){
        if($time < $merge_time){
            push @merge_cmd,'rm -fr '.$time;
        }
    }
    for my $time(@$garbage_time_list){
        push @merge_cmd,'rm -fr '.$time;
    }
    push @merge_cmd,"";
    return @merge_cmd;
}
sub mergeIncrementToFull{
    if("" eq $RESIDUE){
        return;
    }
    if($FULL_MODE){
        logMessage("INFO","Full mode ignore any merge");
        return;
    }
    my ($last_full_time,$merge_time,$valid_time_list,$garbage_time_list) = getMergeStat();
    if("" eq $last_full_time){
        logMessage("WARN","No full backup found before");
        return;
    }
    if("" eq $merge_time){
        logMessage("WARN","No backup need merge");
        return;
    }
    logMessage("INFO","Start increment merge to data: [$merge_time]");
    my @merge_cmd = ();
    push @merge_cmd,q{#!/bin/bash};
    push @merge_cmd,q{cd $(cd "$(dirname "$0")"; pwd)};
    push @merge_cmd,q{C=$1};
    push @merge_cmd,q{M=" : No such file"};
    push @merge_cmd,q{P=`pwd`};
    push @merge_cmd,qq{mkdir -p $merge_time};
    push @merge_cmd,getMergeCommand($last_full_time,$merge_time,$valid_time_list,$garbage_time_list);
    my $path = "" eq $DIRECTORY ? "database_path" : "'${DIRECTORY}/gpseg'||database_content";
    my ($code,$value) = queryResult(qq{select gp_toolkit.gp_backup_execute('> '||$path||'/db_dumps/merge_command.sh') from gp_dist_random('gp_toolkit.gp_segment_config');},"CV");
    my $cmd_string;
    my $cmd_index = 0;
    my $cmd_index_max = @merge_cmd - 1;
    while($cmd_index < $cmd_index_max){
        $cmd_string = join("\n",@merge_cmd[$cmd_index .. $cmd_index + $SQL_BATCH - 1]);
        my $cmd_base64 = encode(trim($cmd_string)."\n");
        my $sql = qq{set log_statement to none;select gp_toolkit.gp_backup_execute('echo "$cmd_base64"|base64 -di >> '||$path||'/db_dumps/merge_command.sh') from gp_dist_random('gp_toolkit.gp_segment_config');};
        my ($code,$value) = queryResult($sql,"CV");
        $cmd_index += $SQL_BATCH;
    }
    my ($code,$value) = queryResult(qq{select gp_toolkit.gp_backup_execute('sh '||$path||'/db_dumps/merge_command.sh '||database_content) from gp_dist_random('gp_toolkit.gp_segment_config');},"CV");
    if($code > 0){
        errorMessage("Execute increment merge failed: [$value]");
    }
    my $stat_file = $MASTER_BACKUP_PATH.$merge_time."/".$LAST_STAT_FILE_NAME;
    system("mv $stat_file.merge $stat_file");
    for my $time(@$valid_time_list){
        if($time < $merge_time){
            system("rm -fr $MASTER_BACKUP_PATH$time");
        }
    }
    for my $time(@$garbage_time_list){
        system("rm -fr $MASTER_BACKUP_PATH$time");
    }
    system("touch $MASTER_BACKUP_PATH$merge_time/$FULL_FILE_NAME");
}
sub getTableFromFile{
    my ($file_name) = @_;
    my @table_list;
    if("" eq $file_name){
        return @table_list;
    }elsif(!-e $file_name){
        errorMessage("File not exists named:$file_name");
    }
    for my $line(readLineFromFile($file_name)){
        $line = trim($line);
        if(!($line =~ /^#/) && "" ne $line){
            my ($table,$where) = split(/;/,$line,2);
            if($INCREMENT){
                $where = "";
            }
            push @table_list,[(trim($table),trim($where))];
        }
    }
    return @table_list;
}
sub logAllTableToFile{
    my $alltable_file = $MASTER_LOG_PATH."/".$ALLTABLE_FILE_NAME;
    my $FILE_HANDLE;
    if(!open($FILE_HANDLE,">>",$alltable_file)){
        errorMessage("Can't open file: $alltable_file");
    }
    close $FILE_HANDLE;
    my $base64_dbname = trim(encode($DATABASE_NAME));
    system("sed '/^$base64_dbname\./d' -i $alltable_file");
    open($FILE_HANDLE,">>",$alltable_file);
    $FILE_HANDLE->autoflush(1);
    for my $row(@TABLE_ARRAY){
        my ($ao,$scma,$rel) = @$row;
        my ($base64_dbname,$base64_scma,$base64_rel) = (encode($DATABASE_NAME),encode($scma),encode($rel));
        print $FILE_HANDLE $base64_dbname.".".$base64_scma.".".$base64_rel."\n";
    }
    close $FILE_HANDLE;
}
sub processTableArray{
    my @all_parent_child_list = queryResult($GET_PARENT_CHILD_LIST_SQL);
    my $full_size = @all_parent_child_list;
    logMessage("INFO","All table number in database is: [$full_size]");
    if($NO_ERROR_TABLE){
        @all_parent_child_list = queryResult($GET_PARENT_CHILD_LIST_NO_ERROR_SQL);
        my $ignore_size = $full_size - @all_parent_child_list;
        logMessage("INFO","Ignore error table number is: [$ignore_size]");
    }
    my @all_relation_temp_list = ();
    for my $row(@all_parent_child_list){
        my ($p_ao,$p_scma,$p_rel,$p_typ,$p_tim,$c_ao,$c_scma,$c_rel,$c_typ,$c_tim) = @$row;
        ($p_scma,$p_rel,$c_scma,$c_rel) = (decode($p_scma),decode($p_rel),decode($c_scma),decode($c_rel));
        push @all_relation_temp_list,[($p_ao,$p_scma,$p_rel,$p_typ,$p_tim,$c_ao,$c_scma,$c_rel,$c_typ,$c_tim)];
    }
    @all_parent_child_list = @all_relation_temp_list;
    #####################################################################################################
    my %specify_table_hash = ();
    for my $table(@TABLE_ARRAY){
        my ($scma,$rel) = split(/\./,$table,2);
        if(illegal($scma,$rel)){
            logMessage("WARN","Table name specify is not legal:[$table]");
            next;
        }
        $specify_table_hash{$table} = "";
    }
    for my $row(getTableFromFile($TABLE_FILE)){
        my ($table,$where) = @$row;
        my ($scma,$rel) = split(/\./,$table,2);
        if(illegal($scma,$rel)){
            logMessage("WARN","Table name in file is not legal:[$table]");
            next;
        }
        $specify_table_hash{$table} = $where;
    }
    #####################################################################################################
    my %ex_table_hash = ();
    for my $table(@EX_TABLE_ARRAY){
        my ($scma,$rel) = split(/\./,$table,2);
        if(illegal($scma,$rel)){
            logMessage("WARN","Ex table name specify is not legal:[$table]");
            next;
        }
        $ex_table_hash{$table} = "";
    }
    for my $row(getTableFromFile($EX_TABLE_FILE)){
        my ($table) = @$row;
        my ($scma,$rel) = split(/\./,$table,2);
        if(illegal($scma,$rel)){
            logMessage("WARN","Table name in file is not legal:[$table]");
            next;
        }
        $ex_table_hash{$table} = ""
    }
    if(!$FORCE_REDO){
        logMessage("INFO","Success log file: $MASTER_DONE_FILE");
        for my $row(readLineFromFile($MASTER_DONE_FILE)){
            my ($base64_name,$full_name) = split(/;/,$row,2);
            my ($dbname,$table) = split(/\./,$full_name,2);
            if($dbname ne $DATABASE_NAME){
                next;
            }
            $ex_table_hash{$table} = ""
        }
    }
    #####################################################################################################
    my %specify_schema_hash = ();
    for my $schema(@SCHEMA_ARRAY){
        $specify_schema_hash{$schema} = "";
    }
    my %ex_schema_hash = ();
    for my $schema(@EX_SCHEMA_ARRAY){
        $ex_schema_hash{$schema} = "";
    }
    #####################################################################################################
    my $check_specify = 0;
    if(keys %specify_table_hash > 0 || keys %specify_schema_hash > 0 || "" ne $TABLE_FILE){
        $check_specify = 1;
        logMessage("INFO","Backup table from parameter or file in database: $DATABASE_NAME");
    }else{
        logMessage("INFO","Backup all user table in database: $DATABASE_NAME");
    }
    @TABLE_ARRAY = ();
    for my $row(@all_parent_child_list){
        my ($p_ao,$p_scma,$p_rel,$p_typ,$p_tim,$c_ao,$c_scma,$c_rel,$c_typ,$c_tim) = @$row;
        my $p_table = $p_scma.".".$p_rel;
        my $c_table = $c_scma.".".$c_rel;
        if(illegal($p_scma,$p_rel,$c_scma,$c_rel)){
            logMessage("FAILED","Table name is not legal:[".escape($c_table eq "." ? $p_table : $c_table)."]");
            next;
        }
        if(exists $ex_table_hash{$p_table} || exists $ex_table_hash{$c_table} || exists $ex_schema_hash{$p_scma}){
            next;
        }
        if($check_specify && (not exists $specify_table_hash{$p_table}) && (not exists $specify_table_hash{$c_table}) && (not exists $specify_schema_hash{$p_scma})){
            next;
        }
        my $where = $specify_table_hash{$p_table};
        if(exists $specify_table_hash{$c_table} && "" ne $specify_table_hash{$c_table}){
            $where = $specify_table_hash{$c_table};
        }
        if("" ne $c_scma){
            push @TABLE_ARRAY,[($c_ao,$c_scma,$c_rel,$c_typ,$c_tim,$where)];
        }else{
            push @TABLE_ARRAY,[($p_ao,$p_scma,$p_rel,$p_typ,$p_tim,$where)];
        }
    }
    #####################################################################################################
    my $log_size = @TABLE_ARRAY;
    logMessage("INFO","All table should be backup be specified: [$log_size]");
    logAllTableToFile();
}
sub getReplicationTable{
    if($DATABASE_VERSION > 5){
        my @result = queryResult($GET_REPLICATION_TABLE_SQL);
        for my $row(@result){
            my ($scma,$rel) = @$row;
            ($scma,$rel) = (decode($scma),decode($rel));
            $HASH_REPLICATION_TABLE{$scma.".".$rel} = "";
        }
    }
}
sub getLastFullBackupDate{
    my $time_string = readpipe("ls -1L $MASTER_BACKUP_PATH|sort -rn");
    for my $time(split(/\n/,$time_string)){
        my $last_stat_file = $MASTER_BACKUP_PATH.$time."/".$LAST_STAT_FILE_NAME;
        if(!-f $last_stat_file || $time >= $TIME_FLAG){
            next;
        }
        my $full_file = $MASTER_BACKUP_PATH.$time."/".$FULL_FILE_NAME;
        if(-f $full_file){
            logMessage("INFO","Last full backup time is: [$time]");
            return $time;
        }
    }
    return "";
}
sub tryCheckpoint{
    my ($code,$value) = queryResult(qq{CHECKPOINT;},"CV");
    if($code eq 0){
        logMessage("INFO","Try to execute checkpoint success");
    }else{
        errorMessage("Try to execute checkpoint failed:\n".$value);
    }
}
sub getLastStatFromFile{
    my ($full_time) = @_;
    my %last_stat_map = ();
    my $time_string = readpipe("ls -1L $MASTER_BACKUP_PATH|sort -rn");
    for my $time(split(/\n/,$time_string)){
        my $stat_file = $MASTER_BACKUP_PATH.$time."/".$LAST_STAT_FILE_NAME;
        if(!-f $stat_file || $time > $TIME_FLAG || $time < $full_time){
            next;
        }
        for my $row(readLineFromFile($stat_file)){
            my ($base64_name,$full_name,$time,$count) = split(/;/,$row);
            my ($dbname,$table) = split(/\./,$full_name,2);
            if($dbname ne $DATABASE_NAME){
                next;
            }
            if(!exists $last_stat_map{$table}){
                $last_stat_map{$table} = [($time,$count)];
            }
        }
    }
    return %last_stat_map;
}
sub getAOLastStatFromCatalog{
    my %last_stat_map = ();
    my ($ao_index, $all_index) = (0, 0);
    my @query_array = ();
    for my $row(@TABLE_ARRAY){
        my ($ao,$scma,$rel,$typ,$tim) = @$row;
        $all_index += 1;
        if($typ ne "h"){
            if("" eq $ao){
                logMessage("WARN","AO table $scma.$rel missing aoseg table, it may be backup always and may occur error");
            }else{
                push @query_array,"select '$scma','$rel',coalesce(sum(modcount),0) from pg_aoseg.$ao";
            }
            $ao_index += 1;
        }
        if($ao_index == $SQL_BATCH || $all_index == @TABLE_ARRAY){
            for my $row(queryResult(join("\nUNION ALL\n",@query_array))){
                my ($c_scma,$c_rel,$count) = @$row;
                my $table = $c_scma.".".$c_rel;
                $last_stat_map{$table} = $count;
            }
            @query_array = ();
            $ao_index = 0;
        }
    }
    return %last_stat_map;
}
sub getHeapLastStatFromCatalog{
    logMessage("INFO","Check heap table stat, if heap table amount is larger, it might cost more time......");
    my %last_stat_map = ();
    my ($heap_index, $all_index) = (0, 0);
    my @query_array = ();
    for my $row(@TABLE_ARRAY){
        my ($ao,$scma,$rel,$typ,$tim) = @$row;
        $all_index += 1;
        if($typ eq "h"){
            my $query_sql = qq{select '$scma','$rel',md5(string_agg(id||','||t, chr(10) order by id,t)) from(\n};
            $query_sql = $query_sql.qq{select gp_segment_id id,gp_toolkit.gp_heap_table_stat($DATABASE_VERSION,'$DATABASE_NAME',database_port,database_path,'$scma.$rel') t\n};
            $query_sql = $query_sql.qq{from gp_dist_random('gp_toolkit.gp_segment_config'))x};
            push @query_array,$query_sql;
            $heap_index += 1;
        }
        if($heap_index == 100 || $all_index == @TABLE_ARRAY){
            for my $row(queryResult(join("\nUNION ALL\n",@query_array))){
                my ($c_scma,$c_rel,$md5) = @$row;
                my $table = $c_scma.".".$c_rel;
                $last_stat_map{$table} = $md5;
            }
            @query_array = ();
            $heap_index = 0;
        }
    }
    return %last_stat_map;
}
sub processLastBackupStat{
    my %last_stat_from_file;
    if($INCREMENT){
        my $full_time = getLastFullBackupDate();
        if("" eq $full_time){
            $FULL_MODE = 1;
            logMessage("WARN","Not found any full backup, will execute full backup this time");
        }else{
            %last_stat_from_file = getLastStatFromFile($full_time);
        }
    }
    my %ao_last_stat_from_cata = getAOLastStatFromCatalog();
    my %heap_last_stat_from_cata = getHeapLastStatFromCatalog();
    my @temp_table_array = ();
    my $ignore_count = 0;
    for my $row(@TABLE_ARRAY){
        my ($ao,$scma,$rel,$typ,$tim,$where) = @$row;
        my $table = $scma.".".$rel;
        if("h" eq $typ){
            my $md5 = $heap_last_stat_from_cata{$table};
            my ($f_md5,$f_count);
            if(exists $last_stat_from_file{$table}){
                ($f_md5,$f_count) = @{$last_stat_from_file{$table}};
            }
            if("" eq $f_md5 || $f_md5 ne $md5){
                push @temp_table_array,[$ao,$scma,$rel,$typ,$md5,$where,0];
            }else{
                $ignore_count ++;
            }
        }else{
            my $count = $ao_last_stat_from_cata{$table};
            my ($f_tim,$f_count);
            if(exists $last_stat_from_file{$table}){
                ($f_tim,$f_count) = @{$last_stat_from_file{$table}};
            }
            if("" eq $f_tim || $f_tim ne $tim || $count ne $f_count){
                push @temp_table_array,[$ao,$scma,$rel,$typ,$tim,$where,$count];
            }else{
                $ignore_count ++;
            }
        }
    }
    logMessage("INFO","Ignore table number for increment stat not change is: [$ignore_count]");
    @TABLE_ARRAY = @temp_table_array;
}
sub checkConflictProcess{
    my $lock_file = "/tmp/.gpmcbackup.lock";
    if(!open($LOCK_FILE_HANDLE,">",$lock_file)){
        errorMessage("Can't open lock file: $lock_file");
    }
    my $lockCode = flock($LOCK_FILE_HANDLE, LOCK_EX | LOCK_NB);
    if(!$lockCode){
        errorMessage("Lock file is in using, you can try again later");
    }
}
sub executeBackup{
    $SIG{'KILL'} = sub{threads->exit;};
    my $task = $TASK_QUEUE->dequeue();
    my $path = "" eq $DIRECTORY ? "database_path" : "'${DIRECTORY}/gpseg'||database_content";
    while(defined $task){
        my ($ao,$scma,$rel,$typ,$tim,$where,$count) = @$task;
        my $table = $scma.".".$rel;
        my $full_name = $DATABASE_NAME.".".$table;
        my $table_qq = qq{"$scma"."$rel"};
        my $on_number = "-1";
        if(exists $HASH_REPLICATION_TABLE{$table}){
            srand;
            $on_number = int(rand($CLUSTER_SIZE));
        }
        $where = "" eq $where ? $GLOBAL_CONDITION : " where ".$where;
        $where =~ s/'/''/g;
        my @stat_msg :shared;
        my $query = qq{begin;LOCK TABLE $table_qq IN ACCESS SHARE MODE NOWAIT;};
        $query = $query.qq{select sum(gp_toolkit.gp_backup_function('$DATABASE_NAME',database_port,database_content,};
        $query = $query.qq{$path,'$table',$TIME_FLAG,'$where','$ENCODING','$COMPRESS',$on_number)) from gp_dist_random('gp_toolkit.gp_segment_config');};
        $query = $query.qq{end;};
        my @info_msg :shared = ("INFO","Start backup table $full_name");
        $MSG_QUEUE->enqueue(\@info_msg);
        my $start = time();
        my ($code,$value) = queryResult($query,"CV");
        my $duration = time() - $start;
        if($code eq 0){
            my $bytes = (split(/\n/,$value))[2];
            @stat_msg= ("SUCCESS"," SIZE $bytes BYTES TIME $duration S",$scma,$rel,$tim.";".$count);
        }else{
            @stat_msg = ("FAILED"," ".$value,$scma,$rel);
        }
        $MSG_QUEUE->enqueue(\@stat_msg);
        $task = $TASK_QUEUE->dequeue();
    }
    $MSG_QUEUE->enqueue(undef);
}
sub executeMessage{
    $SIG{'KILL'} = sub{threads->exit;};
    my ($end_index,$error_index,$success_index) = (0,0,0);
    my $tables_size = @TABLE_ARRAY;
    my $msg = $MSG_QUEUE->dequeue();
    while(1){
        if(defined $msg){
            my ($type,$msg,$scma,$rel,$last_stat) = @$msg;
            my $full_name = $DATABASE_NAME.'.'.$scma.'.'.$rel;
            if("SUCCESS" eq $type){
                $success_index += 1;
                my $base64_name = trim(encode($full_name));
                my $stat_record = $base64_name.";".$full_name.";".$last_stat.";".$TAIL;
                system(qq{sed '/^$base64_name;/d' -i $MASTER_LOG_PATH/$LAST_STAT_FILE_NAME\necho '$stat_record' >> $MASTER_LOG_PATH/$LAST_STAT_FILE_NAME});
                my $done_record = $base64_name.";".$full_name;
                system(qq{sed '/^$base64_name;/d' -i $MASTER_DONE_FILE\necho '$done_record' >> $MASTER_DONE_FILE});
                logMessage($type,"($success_index/$error_index/$tables_size) $full_name".$msg);
                if($NEED_TRUNCATE){
                    my $table_qq = qq{"$scma"."$rel"};
                    queryResult(qq{truncate table $table_qq;},"CV");
                }
            }elsif("FAILED" eq $type){
                my ($base64_dbname,$base64_scma,$base64_rel) = (encode($DATABASE_NAME),encode($scma),encode($rel));
                system(qq{sed '/^$base64_dbname.$base64_scma.$base64_rel\$/d' -i $MASTER_LOG_PATH/$ALLTABLE_FILE_NAME});
                if($msg =~ /relation .+does not exist/){
                    $success_index += 1;
                    logMessage("DROPPED","($success_index/$error_index/$tables_size) $full_name".$msg);
                }else{
                    $error_index += 1;
                    logMessage($type,"($success_index/$error_index/$tables_size) $full_name".$msg);
                }
            }else{
                logMessage($type,$msg);
            }
        }else{
            $end_index += 1;
            if($end_index eq $BATCH_SIZE){
                last;
            }
        }
        $msg = $MSG_QUEUE->dequeue();
    }
    if($FULL_MODE && $error_index == 0){
        logMessage("INFO","Success execute full backup");
        system("touch $MASTER_LOG_PATH/$FULL_FILE_NAME");
    }
    return $error_index;
}
sub startBackup{
    if(!$DATA_ONLY){
        logMessage("INFO","Backup DDLs and OBJECTs and AUTHORIZATIONs");
        my $output = readpipe("gpddlbackup --database $DATABASE_NAME --port $PORT -f $MASTER_LOG_PATH/$DATABASE_NAME.ddl.txt 2>&1");
        my $return_code = $? >> 8;
        if($return_code){
            errorMessage("Backup DDLs occur error:\n".$output);
        }
    }
    my $tables_size = @TABLE_ARRAY;
    if(0 == $tables_size){
        logMessage("INFO","No table will be backup, exit");
        if($INCREMENT){
            exitMain(0);
        }else{
            exitMain(11);
        }
    }else{
        logMessage("INFO","Number of tables should be backup is: $tables_size");
    }
    $TASK_QUEUE = Thread::Queue->new();
    $MSG_QUEUE = Thread::Queue->new();
    for my $table_scalar(@TABLE_ARRAY){
        my @shared_array :shared = @$table_scalar;
        $TASK_QUEUE->enqueue(\@shared_array);
    }
    for my $index(0 .. $BATCH_SIZE - 1){
        $TASK_QUEUE->enqueue(undef);
        my $task_thread = threads->new(\&executeBackup);
        push @TASK_THREAD,$task_thread;
    }
    $MSG_THREAD = threads->new(\&executeMessage);
    for my $thread(@TASK_THREAD){
        $thread->join();
    }
    @TASK_THREAD = ();
    my $value = $MSG_THREAD->join();
    $MSG_THREAD = "";
    return $value;
}
sub main{
    getOption();
    logMessage("INFO","Start backup process".("." x 66));
    logMessage("INFO","Run command: ".$_[0]);
    system("unset PGOPTIONS");
    checkOption();
    getVersion();
    checkFailover();
    checkConflictProcess();
    processDataDirectory();
    logMessage("INFO","Backup directory is $MASTER_LOG_PATH");
    checkRandomTable();
    checkLanguage();
    checkExecuteFunction();
    checkBackupFunction();
    checkHeapStatFunction();
    mergeIncrementToFull();
    processTableArray();
    getReplicationTable();
    tryCheckpoint();
    processLastBackupStat();
    my $error_count = startBackup();
    if($error_count != 0){
        logMessage("INFO","Finish backup with failed......");
        exitMain(33);
    }else{
        system("touch ".$MASTER_BACKUP_PATH.$TIME_FLAG."/".$FULLFILL_STAT_FILE_NAME);
        logMessage("INFO","Finish backup with all success......");
        exitMain(0);
    }
}
my $command_string = $0." ".join(" ",@ARGV);
STDOUT->autoflush(1);
STDERR->autoflush(1);
main($command_string);
